
package com.shiku.imserver.repository.runnable;


import com.mongodb.client.MongoCollection;
import com.mongodb.client.MongoDatabase;
import com.shiku.imserver.common.message.ChatMessage;

import java.util.Collections;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.locks.ReentrantReadWriteLock;

import org.bson.Document;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;


public class ChatMessageDBRunnable
        extends BaseRepositoryMapRunnable<List<Document>> {
    private MongoDatabase chatMsgDB;
    protected ConcurrentHashMap<String, MongoCollection<Document>> dbMaps = new ConcurrentHashMap<>();
    protected ReentrantReadWriteLock lock = new ReentrantReadWriteLock();

    private static Logger log = LoggerFactory.getLogger(ChatMessageDBRunnable.class);


    public ChatMessageDBRunnable(MongoDatabase chatMsgDB) {

        this.chatMsgDB = chatMsgDB;

    }


    public MongoCollection<Document> getMongoCollection(String collectionName) {

        MongoCollection<Document> collection = this.dbMaps.get(collectionName);

        if (null == collection) {
            collection = this.chatMsgDB.getCollection(collectionName);
        }

        collectionName = null;

        return collection;

    }


    public void putMessageToTask(ChatMessage message) {

        Document dbObj = new Document();

        try {

            dbObj.put("message", message.toString());


            dbObj.put("direction", Integer.valueOf(0));

            if (!message.getFromUserId().equals(message.getToUserId())) {

                dbObj.put("sender", message.getFromUserId());

                dbObj.put("receiver", message.getToUserId());

            } else {

                dbObj.put("sender", message.getMessageHead().getFrom());

                dbObj.put("receiver", message.getMessageHead().getTo());

            }

            dbObj.put("sender_jid", message.getMessageHead().getFrom());


            dbObj.put("receiver_jid", message.getMessageHead().getTo());


            dbObj.put("ts", Long.valueOf(System.currentTimeMillis()));

            dbObj.put("contentType", Short.valueOf(message.getType()));

            dbObj.put("messageId", message.getMessageHead().getMessageId());

            dbObj.put("timeSend", Long.valueOf(message.getTimeSend()));

            dbObj.put("deleteTime", Long.valueOf(message.getDeleteTime()));

            dbObj.put("isRead", Integer.valueOf(0));

            dbObj.put("isEncrypt", Boolean.valueOf(message.isEncrypt()));

            if (null != message.getContent()) {

                dbObj.put("content", message.getContent());

            }

        } catch (Exception e) {

            e.printStackTrace();

        }


        String key = null;

        try {

            key = getCollectionName(message.getFromUserId());

        } catch (NumberFormatException e) {

            log.error("error {} ===> {} ", e.getMessage(), message.toString());

            return;

        } catch (Exception e) {

            log.error(e.getMessage(), e);


            return;

        }


        Document toDbObj = Document.parse(dbObj.toJson());

        toDbObj.replace("direction", Integer.valueOf(1));

        if (!message.getFromUserId().equals(message.getToUserId())) {

            toDbObj.replace("sender", message.getToUserId());

            toDbObj.replace("receiver", message.getFromUserId());

            try {

                this.lock.writeLock().lock();


                List<Document> list = (List<Document>) this.maps.get(key);

                if (null == list) {
                    list = Collections.synchronizedList(new LinkedList<>());
                }

                list.add(dbObj);

                addMsg(key, list);


                String tokey = null;

                try {

                    tokey = getCollectionName(message.getToUserId());

                } catch (NumberFormatException e) {

                    log.error("error {} ===> {} ", e.getMessage(), message.toString());

                    return;

                } catch (Exception e) {

                    log.error(e.getMessage(), e);

                    return;

                }

                List<Document> tolist = (List<Document>) this.maps.get(tokey);

                if (null == tolist) {
                    tolist = Collections.synchronizedList(new LinkedList<>());
                }

                tolist.add(toDbObj);

                addMsg(tokey, tolist);

            } catch (Exception e) {

                log.error(e.getMessage());

            } finally {

                this.lock.writeLock().unlock();

            }

        } else {


            toDbObj.put("sender", message.getMessageHead().getTo());

            toDbObj.put("receiver", message.getMessageHead().getFrom());

            try {

                this.lock.writeLock().lock();

                List<Document> list = (List<Document>) this.maps.get(key);

                if (null == list) {
                    list = Collections.synchronizedList(new LinkedList<>());
                }

                list.add(dbObj);

                list.add(toDbObj);

                addMsg(key, list);

            } catch (Exception e) {

                log.error(e.getMessage());

            } finally {

                this.lock.writeLock().unlock();

            }

        }

    }


    @Override
    public void runTask() {

        try {

            MongoCollection<Document> collection = null;

            List<Document> list = null;

            Iterator<Map.Entry<String, List<Document>>> iterator = null;


            iterator = this.maps.entrySet().iterator();

            Map.Entry<String, List<Document>> entry = null;

            while (iterator.hasNext()) {


                try {

                    entry = iterator.next();

                    list = entry.getValue();

                    this.lock.writeLock().lock();

                    if (!list.isEmpty()) {

                        collection = getMongoCollection(entry.getKey());

                        collection.insertMany(list);

                        list.clear();

                    } else {


                        iterator.remove();

                        list = null;

                    }


                } catch (Exception e) {

                    log.error(e.toString(), e);

                } finally {

                    this.lock.writeLock().unlock();

                }

            }

        } catch (Exception e) {

            log.error(e.toString(), e);

        } finally {
        }

    }

}


